原文作者:翟开顺
首发:CSDN
本人仅为自己方便查阅做了摘抄,请支持原作者
原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428
键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin
github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8
先从spark-learning中的一张图大致了解其功能
subtractByKey
函数定义1
2
3
4
5def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]
类似于subtrac,删掉 RDD 中键与 other RDD 中的键相同的元素
join
函数定义1
2
3
4
5def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
RDD1.join(RDD2)
可以把RDD1,RDD2中的相同的key给连接起来,类似于sql中的join操作
fullOuterJoin
和join类似,不过这是全连接
leftOuterJoin
1 | def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] |
直接看图即可
对两个 RDD 进行连接操作,类似于sql中的左外连接
rightOuterJoin
对两个 RDD 进行连接操作,类似于sql中的右外连接,存在的话,value用的Some, 不存在用的None,具体的看上面的图和下面的代码即可
代码示例
scala语言1
2
3
4
5
6
7
8
9
10
11
12
13
14scala> val rdd = sc.makeRDD(Array((1,2),(3,4),(3,6)))
scala> val other = sc.makeRDD(Array((3,9)))
scala> rdd.subtractByKey(other).collect()
res0: Array[(Int, Int)] = Array((1,2))
scala> rdd.join(other).collect()
res1: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))
scala> rdd.leftOuterJoin(other).collect()
res2: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))
scala> rdd.rightOuterJoin(other).collect()
res3: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9)))
java语言1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Map;
public class JoinRDD {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("ReduceByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("WARN");
JavaRDD<Tuple2<Integer,Integer>> rddPre = sc.parallelize(Arrays.asList(new Tuple2(1,2)
, new Tuple2(3,4)
, new Tuple2(3,6)));
JavaRDD<Tuple2<Integer,Integer>> otherPre = sc.parallelize(Arrays.asList(new Tuple2(3,10),new Tuple2(4,8)));
//JavaRDD转换成JavaPairRDD
JavaPairRDD<Integer, Integer> rdd = JavaPairRDD.fromJavaRDD(rddPre);
JavaPairRDD<Integer, Integer> other = JavaPairRDD.fromJavaRDD(otherPre);
//subtractByKey
JavaPairRDD<Integer, Integer> subRDD = rdd.subtractByKey(other);
//join
JavaPairRDD<Integer, Tuple2<Integer, Integer>> joinRDD = rdd.join(other);
//fullOutJoin
JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinRDD = rdd.fullOuterJoin(other);
//leftOuterJoin
JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinRDD = rdd.leftOuterJoin(other);
//rightOutJoin
JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinRDD = rdd.rightOuterJoin(other);
//输出看效果
Map<Integer, Integer> subMap = subRDD.collectAsMap();
System.out.println("-------------subRDD-------------");
for (Integer key : subMap.keySet()) {
System.out.println("subRDD: "+key+", "+subMap.get(key));
}
Map<Integer, Tuple2<Integer, Integer>> joinMap = joinRDD.collectAsMap();
System.out.println("-------------joinRDD-------------");
for (Integer key : joinMap.keySet()) {
System.out.println("join: "+key+", Tuple("+joinMap.get(key)._1+","+joinMap.get(key)._2+")");
}
Map<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinMap = fullOutJoinRDD.collectAsMap();
System.out.println("-------------fullOutJoinRDD-------------");
for (Integer key : fullOutJoinMap.keySet()) {
System.out.println("fullOutJoinRDD: "+key+", Tuple("+fullOutJoinMap.get(key)._1+","+fullOutJoinMap.get(key)._2+")");
}
Map<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinMap = leftOutJoinRDD.collectAsMap();
System.out.println("-------------leftOutJoinRDD-------------");
for (Integer key : leftOutJoinMap.keySet()) {
System.out.println("leftOutJoinRDD: "+key+", Tuple("+leftOutJoinMap.get(key)._1+","+leftOutJoinMap.get(key)._2+")");
}
Map<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinMap = rightOutJoinRDD.collectAsMap();
System.out.println("-------------rightOutJoinRDD-------------");
for (Integer key : rightOutJoinMap.keySet()) {
System.out.println("rightOutJoinRDD: "+key+", Tuple("+rightOutJoinMap.get(key)._1+","+rightOutJoinMap.get(key)._2+")");
}
}
}
运行后显示1
2
3
4
5
6
7
8
9
10
11
12
13
14-------------subRDD-------------
subRDD: 1, 2
-------------joinRDD-------------
join: 3, Tuple(6,10)
-------------fullOutJoinRDD-------------
fullOutJoinRDD: 4, Tuple(Optional.empty,Optional[8])
fullOutJoinRDD: 1, Tuple(Optional[2],Optional.empty)
fullOutJoinRDD: 3, Tuple(Optional[6],Optional[10])
-------------leftOutJoinRDD-------------
leftOutJoinRDD: 1, Tuple(2,Optional.empty)
leftOutJoinRDD: 3, Tuple(6,Optional[10])
-------------rightOutJoinRDD-------------
rightOutJoinRDD: 4, Tuple(Optional.empty,8)
rightOutJoinRDD: 3, Tuple(Optional[6],10)